package com.example.demo.stream.sink;

import cn.hutool.json.JSONUtil;
import com.example.demo.env.SpringContextHolder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.JedisCluster;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

/**
 * 写出到mySQL
 *
 * @author: wbx
 */
public class RedisSink<In> extends RichSinkFunction<In> {

    /**
     * redis操作对象
     */
    private JedisCluster jedis;

    /**
     * redis缓存key
     */
    private final String redisKey;


    public RedisSink(String redisKey) {
        this.redisKey = redisKey;
    }

    @Override
    public void open(Configuration parameters) {
        // 获取上下文中的 jedis对象
        jedis = SpringContextHolder.getBean(JedisCluster.class);
    }

    @Override
    public void close() throws Exception {
        System.out.println("RedisSink对象关闭。。。。");
        super.close();
        if (Objects.nonNull(jedis))
            jedis.close();
    }

    @Override
    public void invoke(In value, Context context) throws NoSuchFieldException, IllegalAccessException {
        // 获取类的所有属性
        Field[] declaredFields = value.getClass().getDeclaredFields();
        String jsonStr = JSONUtil.toJsonStr(value);
        // 如果属性中包含id字段
        if(Arrays.stream(declaredFields).map(Field::getName).collect(Collectors.toList()).contains("id")) {
            // 获取类中的id属性
            Field idField = value.getClass().getDeclaredField("id");
            // 获取id属性的值
            String id =String.valueOf(idField.get(value));
            jedis.hset(redisKey, id, jsonStr);
        } else {
            jedis.rpush(redisKey, jsonStr);
        }
    }
}
